Before you turn this problem in, make sure everything runs as expected. First, restart the kernel (in the menubar, select Kernel$\rightarrow$Restart) and then run all cells (in the menubar, select Cell$\rightarrow$Run All).
Make sure you fill in any place that says YOUR CODE HERE or "YOUR ANSWER HERE", as well as your name and collaborators below:
NAME = "Grupo HDFS 22"
COLLABORATORS = "Esteban Braganza y Borja López"

Esta actividad consiste en que implementéis un pequeño proyecto de análisis de datos utilizando algunas de las herramientas que nos ofrece Amazon Web Services (AWS). Se trata, pues, de una implementación libre de un proyecto en la nube de AWS.
Para hacerlo, utilizaremos una serie de recursos a los que os hemos dado acceso en la plataforma educativa AWS Academy, que son prácticamente los mismos que tenemos en la versión comercial/empresarial, pero sin que tengáis que preocuparos por daros de alta ni por los costos de su uso. A la hora de implementar el proyecto, os recomendamos maximizar el uso de los servicios PaaS y minimizar el uso de servicios IaaS. Por ejemplo, podéis utilizar los servicios S3, Lambda, Kinesis, EMR, EC2, Glue, Athena o Redshift, pero sois libres de utilizar los servicios adecuados para la recolección, almacenamiento, procesamiento y análisis de datos en función de vuestro problema. Podréis encontrar la lista de servicios disponibles dentro de la plataforma AWS Academy.
En cuanto a la evaluación, esta se realizará a partir de las rúbricas que encontraréis en este documento para cada uno de los apartados.
Antes de empezar a desarrollar el proyecto, es muy recomendable realizar el curso AWS Academy Cloud Foundations al que os hemos dado acceso en la plataforma AWS Academy. En especial, os recomendamos hacer los apartados de prácticas de laboratorio que hay, donde se aprende a trabajar con EC2, S3, Lambda, etc.
Para acceder al curso, debéis iniciar sesión en la plataforma de AWS Academy en https://www.awsacademy.com/ con vuestro correo de UOC.
Veréis que tenéis acceso al curso AWS Cloud Foundations y a un laboratorio (Learner Lab) para hacer las prácticas. Tened en cuenta que la mayoría de los espacios de AWS Academy tienen una serie de restricciones (en especial, número de créditos y tiempo disponible) y, por tanto, debéis ser muy cuidadosos al guardar y documentar cualquier configuración que hagáis en el entorno de AWS Academy. Si superáis el presupuesto del laboratorio, se desactivará la cuenta del laboratorio y se perderá todo el progreso y los recursos.
El curso es bastante extenso, pero debéis leer y hacer las prácticas que hay en los siguientes módulos:
En el caso del Learner Lab,
En este apartado, explicad y justificad los servicios de AWS que habéis seleccionado en vuestro proyecto, teniendo en cuenta los objetivos del proyecto y el tipo de datos con los que trabajáis. Describid detalladamente cómo se realizará el procesamiento de los datos en AWS. Incluid una explicación de los flujos de trabajo, las transformaciones de datos y cómo se gestionará la integración entre los diferentes servicios de AWS. Aseguraos de que la selección de los servicios se alinea con las necesidades específicas. Si es posible, se pide utilizar los iconos estándar de AWS para mostrar de forma visual vuestra propuesta de tratamiento de datos.
| Categoría | Excelente (4) | Bueno (3) | Satisfactorio (2) | Necesita Mejora (1) |
|---|---|---|---|---|
| Claridad y Precisión | La descripción y justificación de los servicios es muy clara y precisa. Toda la información es fácilmente comprensible. | La descripción es clara y precisa, con pocos errores de comprensión. | La descripción es generalmente clara, pero puede tener algunos errores u omisiones menores. | La descripción es confusa y contiene muchos errores que dificultan la comprensión. |
| Justificación de los Servicios de AWS Seleccionados | La justificación de los servicios seleccionados está bien argumentada y alineada con los objetivos del proyecto y el tipo de datos. | La justificación está bien argumentada, pero puede faltar alguna alineación clara con los objetivos o datos. | La justificación es básica y hay falta de alineación clara con los objetivos o datos. | La justificación no está argumentada ni alineada con los objetivos del proyecto ni el tipo de datos. |
| Descripción del Procesamiento de Datos | La descripción del procesamiento de los datos es detallada y cubre todos los aspectos importantes, incluidos los flujos de trabajo y las transformaciones. | La descripción es detallada pero puede faltar alguna información específica. | La descripción cubre los aspectos básicos pero faltan detalles importantes. | La descripción es inadecuada y no cubre los aspectos importantes del procesamiento de datos. |
| Integración entre Servicios de AWS | La integración entre los servicios está bien explicada y es coherente. Todos los servicios se integran de manera eficiente. | La integración está bien explicada, pero podría mejorar en coherencia. | La integración está descrita de manera básica pero faltan detalles y coherencia. | La integración no está bien explicada y los servicios no se integran de manera efectiva. |
| Presentación Visual con Iconos de AWS | Los iconos estándar de AWS están bien utilizados y la propuesta es visualmente clara y bien estructurada. | Los iconos están utilizados pero la claridad visual puede mejorar. | Los iconos están poco utilizados o mal colocados, lo que reduce la claridad visual. | Los iconos no están utilizados y la propuesta es confusa visualmente. |
Adjunta la explicación aquí. Las explicaciones son capturas de pantalla, de lo que demostraréis en la entrevista de la actividad.
En el proyecto presentado, hemos diseñado una plataforma de análisis de datos de videojuegos utilizando servicios AWS. El objetivo es llegar a analizar los datos, capturando, alamcenando, procesanso y analizando en timepo real los datos.
El esquema utilizado es el siguiente:
A continuación, detallamos los servicios de AWS seleccionados y justificamos su uso:
Nos permite capturar los datos de los juegos que obtenemos a través de aplicaciones móviles, servidores de juegos y navegadores web en tiempo real. Amazon Kinesis Data Stream es lo que necesitamos, ya que nos permite alamcenar y manejar el gran numero de eventos de los juegos que obtenemos a través de las fuentes.
Se encarga de mover los datos de Kinesis Data Streams a Amazon S3 en tiempo real, para su almacenamiento en bruto. Mediante este paso intermedio, simplificamos la entrega de datos hacía el alamcenamiento duradero, como es "Amazon simple storage Services", es decir Amaxon S3.
Descripción: Utilizamos S3 como el almacenamiento central del proyecto en tres capas:
S3 ofrece un almacenamiento económico, escalable y con alta durabilidad para nuestros datos, además la separación en capas permite un mayor control y gestión eficientes del ciclo de vida de los datos.
Se utilizan dos tipos de AWS Glue:
Glue Jobs: se encarga de llevar a cabo la extracción, transformación y carga de los datos de forma automatica de la primera capa a la segunda, en nuestro caso la capa final. Además permite realizar trasnformaciones, como la limpieza, normalización y cálculos de agregación.
Glue Crawlers: se encarga actaulizar el catalogo de datos a través de la detección automatica de los esquemas de los datos almacenados en S3, lo cual, nos permite simplificar la gestión de los datos.
Permite ejecutar consultas SQL directamente de los datos almacenados en S3 y elimina la necesidad de mover datos a una base de datps tradicional.
Permite la creación de dashboards para facilitar la visualización de datos procesados.
Realizad un conjunto de pruebas para aseguraros de que los datos se están procesando correctamente. Describid los pasos seguidos para verificar la precisión y la consistencia del procesamiento de datos. En este apartado debéis utilizar los servicios de persistencia de datos (EBS y/o S3), una máquina EC2, alguna de las bases de Amazon (RDS) y el uso de Lambda. Además, será necesario utilizar algún servicio adicional de los que ofrece Amazon y a los que tenéis acceso en el Learner Lab.
| Ítem | Excelente (4) | Bueno (3) | Satisfactorio (2) | Necesita Mejora (1) | |
|---|---|---|---|---|---|
| Descripción de los Pasos Seguidos | La descripción de los pasos seguidos es muy clara, detallada y fácilmente comprensible. | La descripción es clara y detallada con pocos errores de comprensión. | La descripción es generalmente clara, pero puede tener algunos errores u omisiones menores. | La descripción es confusa y contiene muchos errores que dificultan la comprensión. | |
| Precisión y Consistencia de los Datos | Las pruebas demuestran una precisión y consistencia excelente del procesamiento de datos con resultados fiables y consistentes. | Las pruebas demuestran una precisión y consistencia buenas con pocos errores. | Las pruebas demuestran una precisión y consistencia aceptables pero con algunos errores. | Las pruebas demuestran una falta de precisión y consistencia significativa en el procesamiento de datos. | |
| Uso de los Servicios de AWS (EBS/S3, EC2, RDS, Lambda) | El uso de los servicios de AWS está bien justificado y se utilizan de manera eficiente para el procesamiento de datos. | El uso de los servicios está bien justificado, pero puede faltar algo de eficiencia. | El uso de los servicios es aceptable pero no totalmente eficiente o justificado. | El uso de los servicios es inadecuado o no se justifica correctamente. | |
| Integración entre Servicios de AWS | La integración entre los servicios de AWS es excelente, con flujos de trabajo bien definidos. | La integración es buena pero podría ser más eficiente. | La integración es aceptable pero puede tener algunas deficiencias. | La integración es inadecuada y no se completan los flujos de trabajo correctamente. | |
| Presentación Visual con Iconos de AWS | Los iconos estándar de AWS están bien utilizados y la propuesta es visualmente clara y bien estructurada. | Los iconos están utilizados pero la claridad visual puede mejorar. | Los iconos están poco utilizados o mal colocados, lo que reduce la claridad visual. | Los iconos no están utilizados y la propuesta es confusa visualmente. |
Adjunta la explicación aquí. Las explicaciones son capturas de pantalla, de lo que demostraréis en la entrevista de la actividad.
Las pruebas que hemos llevado a cabo para el procesamiento de datos son las siguiente:
Para asegurar que los datos llegan correctamente y se guardan y organizan en las capas "Bronze" y "Silver" en S3 llevamos a cabo los siguientes pasos:
Para asegurar que la transformación de los datos llevadas a cabo por Glue Jobs se ejecutan de forma adecuada, realizamos los siguientes pasos:
Para asegurar que las consultas SQL realizadas sobre los datos procesados son precisas y retornan los resultados esperados, llevaremos a cabo los siguientes pasos:
El crawler de AWS determina el esquema de la tabla automáticamente. Debemos asegurarnos que la estructura Json del mismo incluya todas las posibles combinaciones para que no tenga problemas con nuevos datos. Hasta el momento de la entrega hemos obtenido 110.244 registros en la capa bronze.
CREATE EXTERNAL TABLE `raw_events_actividad_3`(
`event` STRUCT<
event_version:STRING,
event_id:STRING,
event_type:STRING,
event_name:STRING,
event_timestamp:INT,
app_version:STRING,
event_data:STRUCT<
item_id:STRING,
item_version:INT,
platform:STRING,
last_login_time:INT,
last_screen_seen:STRING,
level_id:STRING,
level_version:INT,
tutorial_screen_id:STRING,
tutorial_screen_version:INT,
report_id:STRING,
report_reason:STRING,
user_rating:INT,
match_id:STRING,
map_id:STRING,
match_type:STRING,
user_rank_reached:STRING,
lootbox_id:STRING,
lootbox_cost:INT,
item_rarity:STRING,
item_cost:INT,
match_result_type:STRING,
exp_gained:INT,
most_used_spell:STRING,
item_amount:INT,
currency_type:STRING,
country_id:STRING,
currency_amount:INT,
transaction_id:STRING,
matched_slots:INT,
spell_id:STRING,
matching_failed_msg:STRING
>
) COMMENT 'from deserializer',
`application_id` STRING COMMENT 'from deserializer'
)
PARTITIONED BY (
`partition_0` STRING,
`partition_1` STRING,
`partition_2` STRING,
`partition_3` STRING
)
ROW FORMAT SERDE
'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'paths'='application_id,event'
)
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://raw-events-actividad-3/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'UPDATED_BY_CRAWLER'='crawler-actividad3',
'averageRecordSize'='1042',
'classification'='json',
'compressionType'='none',
'objectCount'='39',
'partition_filtering.enabled'='true',
'recordCount'='184420',
'sizeKey'='192945298',
'typeOfData'='file'
);
Vemos que las tablas de los eventos procesados están correctamente guardadas se puede consultar sus datos e incluso conectarlos con Quicksight para su visualización.
Para segurara que los datos estan llegando correcatmente a las capas finales y que se pueden representar visualmente, llevaremos a cabo un pequeño Dashboard mediante Quicksight.
Para aseguraros de que los KPIs (Key Performance Indicators) definidos están correctamente medidos y reflejan fielmente la realidad del análisis, es necesario seguir un proceso riguroso y meticuloso. A continuación, se describen los pasos y métodos utilizados para verificar la precisión y la relevancia de los KPIs:
Siguiendo estos pasos, podréis validar que los KPIs definidos están correctamente medidos y reflejan fielmente la realidad del análisis, garantizando así que proporcionen información útil y accionable para la toma de decisiones.
| Ítem | Excelente (4) | Bueno (3) | Satisfactorio (2) | Necesita Mejora (1) |
|---|---|---|---|---|
| Definición Clara y Precisa de los KPIs | Los KPIs están claramente definidos, alineados con los objetivos del proyecto y son fácilmente medibles. | Los KPIs están bien definidos y alineados, pero podría haber mejoras en la medibilidad. | Los KPIs están parcialmente definidos y alineados, pero con carencias significativas en la medibilidad. | Los KPIs no están claramente definidos ni alineados con los objetivos del proyecto. |
| Recopilación de Datos Consistente | La recopilación de datos es sistemática y coherente, utilizando fuentes fiables y métodos adecuados. | La recopilación de datos es generalmente consistente, pero con algunas omisiones o errores menores. | La recopilación de datos es aceptable, pero hay inconsistencias o carencias en los métodos utilizados. | La recopilación de datos no es consistente ni sistemática, con fuentes y métodos inadecuados. |
| Validación de la Precisión | Las auditorías de datos son periódicas y detalladas, asegurando la precisión con comparaciones adecuadas con estándares. | Las auditorías de datos son regulares, pero podrían ser más detalladas o frecuentes. | Las auditorías de datos son ocasionales y con detalles insuficientes, afectando la precisión. | No se realizan auditorías de datos adecuadas, y la precisión es inadecuada. |
| Análisis de la Relevancia | Los análisis de correlación y el feedback continuo de los stakeholders aseguran que los KPIs son relevantes. | Los análisis y el feedback son buenos, pero podrían ser más completos o frecuentes. | Los análisis de correlación son superficiales, y el feedback no se recoge de manera consistente. | No se realizan análisis de correlación ni se recoge feedback adecuadamente. |
| Revisión y Ajuste Continuo | Los KPIs se revisan y ajustan regularmente para asegurar que continúan siendo relevantes y eficientes. | La revisión y ajuste de los KPIs se hace periódicamente, pero con algunas carencias. | La revisión de los KPIs es infrecuente y los ajustes son insuficientes. | No se realizan revisiones ni ajustes regulares de los KPIs. |
Adjunta la explicación aquí. Las explicaciones son capturas de pantalla, de lo que demostraréis en la entrevista de la actividad.
Los KPI que hemos seleccionado para el análisis, y que son frecuentes en game analytics son:
Y para su validación hemos llevado a cabo estos pasos:
| Grupo | KPI | Detalles |
|---|---|---|
| Desempeño Financiero | ARPU (Average Revenue Per User) | Objetivo Específico: Medir el ingreso promedio generado por cada usuario en un período determinado. Medible y Cuantificable: Se calcula dividiendo el ingreso total generado por el número total de usuarios. |
| LTV (Lifetime Value) | Objetivo Específico: Medir el valor total que un usuario genera a lo largo de su ciclo de vida en la app. Medible y Cuantificable: Se calcula con modelos predictivos basados en ingresos promedio y duración estimada de actividad del usuario. |
|
| Descargas e Instalaciones | INSTALLS | Objetivo Específico: Medir el número total de veces que la aplicación ha sido descargada e instalada. Medible y Cuantificable: Se mide utilizando datos de las plataformas de distribución (App Store, Google Play, etc.). |
| CPI (Cost Per Install) | Objetivo Específico: Medir el costo que se incurre por cada instalación de la aplicación Medible y Cuantificable: Se calcula dividiendo el costo total de las campañas de adquisición entre el número total de instalaciones generadas. |
|
| Actividad y Participación | DAU (Daily Active Users) | Objetivo Específico: Medir el número de usuarios activos en un día específico. Medible y Cuantificable: Se mide directamente a través de eventos de inicio de sesión o sesión ( session_start). |
| Session Length | Objetivo Específico: Medir el tiempo promedio que los usuarios pasan dentro de la aplicación por sesión. Medible y Cuantificable: Se mide calculando la duración promedio entre el inicio y el final de cada sesión registrada. |
|
| Conversión | Conversion Rate | Objetivo Específico: Medir el porcentaje de usuarios que completan una acción específica deseada. Medible y Cuantificable: Se calcula dividiendo el número de usuarios que completan la acción por el número total de usuarios que iniciaron el proceso. |
| Funnel Conversion Rate | Objetivo Específico: Medir el porcentaje de usuarios que completan una acción específica deseada. Medible y Cuantificable: Se mide calculando el porcentaje de usuarios que pasan de una etapa a la siguiente dentro del embudo. |
Auditoría de Datos: Es importante hacer auditorías periódicas para verificar que los eventos estén correctamente registrados. En nuestro caso, es esencial verificar que no se este perdiendo o duplicando información, ya que nos estaría falsificando la información. Por eso es necesario que verifiquemos constantement que la información que ellega es correcta y que los datos se esten transofrmando correctamente a medida que avanza el procesamiento de datos.
Comparación con Estándares: Comparar los KPIs con benchmarks de la industria del juego móvil o con datos históricos del juego para asegurarse de que los valores de los KPIs sean razonables y estén alineados con lo esperado.
Analisis de correlación: | Grupo | KPI | Detalles | |:-------------------------|:----------------------------|:------------------------------------------------------------------------------------------------| | Desempeño Financiero | ARPU (Average Revenue Per User) | Analizar si el ARPU correlaciona con la rentabilidad general de la app y las estrategias de monetización. | | | LTV (Lifetime Value) | Verificar que el LTV esté alineado con los objetivos de crecimiento a largo plazo. | | Descargas e Instalaciones | INSTALLS | Confirmar que el número de instalaciones tenga un impacto directo en la base de usuarios activos. | | | CPI (Cost Per Install) | Validar que un CPI más bajo se traduzca en un crecimiento rentable de la base de usuarios. | | Actividad y Participación | DAU (Daily Active Users) | Correlacionar el DAU con otros KPIs como ingresos o tiempo de sesión. | | | Session Length | Determinar si sesiones más largas conducen a una mayor monetización o retención. | | Conversión | Conversion Rate | Asegurar que una tasa de conversión más alta resulte en mayores ingresos o valor agregado. | | | Funnel Conversion Rate | Analizar cada etapa del embudo para identificar puntos críticos donde se pierdan usuarios. |
Feedback Continuo: Hay que ir recopliando el feedback de los steakholders contantement para asegurar que los KPI sean relevantes. En nuestro caso, es relevante los ingresos que generan los usuarios, así como el tiempo que permanecen activos en el juego.
| Grupo | KPI | Detalles |
|---|---|---|
| Desempeño Financiero | ARPU (Ingresos Promedio por Usuario) | Análisis y ajuste del ARPU en función de cambios en ingresos o estructura de usuarios. |
| LTV (Valor de Vida del Cliente) | Actualización de cálculos del LTV basados en cambios en estrategias de retención o monetización. | |
| Descargas e Instalaciones | INSTALACIONES | Revisión de métricas tras la implementación de campañas de adquisición. |
| CPI (Costo por Instalación) | Ajustes en el CPI según los impactos de las estrategias de adquisición. | |
| Actividad y Participación | DAU (Usuarios Activos Diarios) | Ajuste de la definición de DAU con la introducción de nuevos eventos significativos. |
| Duración de la Sesión | Modificación de los criterios ante cambios en los patrones de uso detectados. | |
| Conversión | Tasa de Conversión | Adaptación de la definición de la tasa de conversión según nuevos objetivos. |
| Tasa de Conversión del Embudo | Análisis de las tasas de conversión tras rediseñar el embudo. |
Incluid los scripts o código más relevantes que han sido utilizados durante la implementación del proyecto. Aseguraos de que están suficientemente descritos para facilitar la comprensión de las operaciones realizadas.
Durante la implementación del proyecto en el AWS Academy Learner Lab, es importante incorporar y documentar los scripts o código utilizados. Es necesario detallar los siguientes apartados:
| Ítem | Excelente (4) | Bueno (3) | Satisfactorio (2) | Necesita Mejora (1) |
|---|---|---|---|---|
| Selección de los Scripts Relevantes | Los scripts seleccionados son esenciales y priorizan el impacto directo en los objetivos del proyecto. | Los scripts seleccionados son relevantes pero podrían incluir más componentes clave. | La selección de scripts es básica e incluye algunos componentes clave pero omite otros importantes. | La selección de scripts es inadecuada y no incluye los componentes esenciales. |
| Descripción de los Scripts | La descripción del propósito, funcionamiento y dependencias es muy clara y detallada. | La descripción es clara pero puede faltar algún detalle menor. | La descripción es aceptable pero faltan detalles importantes para la comprensión completa. | La descripción es confusa o inadecuada, con muchos detalles importantes omitidos. |
| Ejemplos de Código | Los ejemplos de código incluyen fragmentos bien comentados y casos de uso prácticos y relevantes. | Los ejemplos de código son buenos pero podrían incluir más comentarios o casos de uso. | Los ejemplos de código son aceptables pero con comentarios o casos de uso insuficientes. | Los ejemplos de código son inadecuados, con comentarios o casos de uso mínimos o inexistentes. |
| Documentación | Las guías son detalladas y proporcionan soluciones a errores comunes de manera clara y efectiva. | Las guías son buenas pero podrían ser más detalladas o incluir más soluciones. | Las guías son aceptables pero con información insuficiente sobre la implementación y resolución de problemas. | Las guías son inadecuadas y no proporcionan información útil para la implementación. |
| Revisión y Validación | Las pruebas de funcionalidad son completas y el feedback es recogido e implementado para mejorar el código. | Las pruebas de funcionalidad son buenas pero podrían ser más completas. | Las pruebas de funcionalidad son aceptables pero con deficiencias en algunos aspectos. | Las pruebas de funcionalidad son inadecuadas y no se recoge ni implementa feedback. |
Adjunta la explicación aquí. Las explicaciones son capturas de pantalla, de lo que demostraréis en la entrevista de la actividad.
A continuación se encuentran los dos scripts más relevantes del proyecto y sus explicaciones.
En primer lugara para el script que genera los datos del proyecto, publish_data.py:
| Apartado | Descripción |
|---|---|
| Objetivo | Creación de los eventos del juego que se van a analizar en el proyecto y evniarlos a Kinesis para publicar archivos simulando un productor de datos. |
| Funcionamiento | Simula la generación de eventos parecidos a los que se enviarían desde una aplicación móvil y los envía a Kinesis. |
| Entradas | Nombre del Data Stream, Región de AWS, ID de la aplicación. |
| Salidas | Eventos enviados a Kinesis publicados en batches de tamaño batch_size. |
| Dependencias | os, argparse, boto3. Utiliza el servicio Amazon S3. |
| Fragmento de Código | |
| Ejemplo de Uso | Este fragmento genera un evento aleatorio y lo envía a un stream de Kinesis llamado game-analytics-stream. |
| Guía de Implementación | 1. Configura credenciales de AWS. 2. Define bucket y ruta fuente. 3. Ejecuta el script proporcionando argumentos necesarios. |
| Resolución de Problemas | - Error de credenciales: Verifica AWS_ACCESS_KEY y AWS_SECRET_ACCESS_KEY. - Archivos no encontrados: Confirma rutas locales. |
| Pruebas | - Realiza pruebas con buckets y rutas de prueba. - Valida los archivos subidos en S3. |
| Feedback | Se recopila retroalimentación para mejorar el funcionamiento y la documentación. |
Y en segundo lugar silver_glue_job.py:
| Apartado | Descripción |
|---|---|
| Objetivo | Procesar datos almacenados en S3 utilizando AWS Glue y transformarlos en tablas aptas para Athena. |
| Funcionamiento | Usa pyspark para leer, procesar y escribir datos en formato Parquet en S3. |
| Entradas | Ruta de datos en S3. |
| Salidas | Datos procesados almacenados en un bucket S3 en formato Parquet. |
| Dependencias | pyspark, boto3, AWS Glue. |
| Fragmento de Código | |
| Ejemplo de Uso | Filtrar eventos específicos, seleccionar y transformar columnas, eliminar duplicados y eliminar datos. |
| Guía de Implementación | 1. Configura el entorno Glue en AWS. 2. Asegúrate de que los datos estén disponibles en S3. 3. Crea un trabajo Glue con el script. |
| Resolución de Problemas | - Errores en schema: Ajusta definiciones de DynamicFrame. - Permisos S3: Verifica políticas del bucket. |
| Pruebas | Procesa un dataset pequeño y valida que la estructura resultante sea correcta. |
| Feedback | Se recopila retroalimentación para mejorar el funcionamiento y la documentación. |
Este script genera y envía datos aleatorios o predefinidos a un flujo de Kinesis en AWS. Es útil para realizar pruebas de ingestión de datos en tiempo real y para desarrollar aplicaciones de análisis de datos basadas en Kinesis.
El script requiere las siguientes bibliotecas de Python:
boto3: Cliente de AWS SDK para Python.json: Para la manipulación de datos JSON.random: Para generar datos aleatorios.time: Para trabajar con marcas de tiempo.uuid: Para generar identificadores únicos.argparse: Para manejar argumentos de línea de comandos.numpy: Para operaciones avanzadas con datos.access_keys: Archivo que contiene las claves de acceso AWS (ACCESS_KEY y SECRET_KEY).Instale las dependencias usando:
pip install boto3 numpy
El script se ejecuta desde la línea de comandos y acepta varios argumentos configurables. Por defecto, genera eventos aleatorios y los envía indefinidamente al flujo de Kinesis.
python publish_data.py --region <REGIÓN_AWS> --stream-name <NOMBRE_FLUJO> --application-id <ID_APLICACIÓN>
| Parámetro | Requerido | Descripción |
|---|---|---|
--region |
Sí | Región AWS donde se encuentra el flujo de Kinesis. |
--stream-name |
Sí | Nombre del flujo de Kinesis al que se enviarán los datos. |
--application-id |
Sí | Identificador de la aplicación que envía los datos. |
--batch-size |
No | Tamaño del lote de eventos enviados en cada solicitud. Valor por defecto: 5. |
--input-filename |
No | Archivo de entrada con eventos JSON predefinidos, uno por línea. Si se usa, el script termina al procesarlo. |
El script se organiza en las siguientes secciones principales:
Importación de Módulos
Importa bibliotecas necesarias como boto3, json, uuid, time, random, y otras para la generación y transmisión de datos.
Constantes y Valores Predeterminados
Define valores predeterminados para los eventos y parámetros, como DEFAULT_EVENT_VERSION y DEFAULT_BATCH_SIZE.
Funciones
parse_cmd_line: Analiza los argumentos de la línea de comandos. getUUIDs: Genera una lista de identificadores UUID. getEventType: Selecciona un tipo de evento aleatorio basado en probabilidades preconfiguradas. getEvent: Crea un evento aleatorio utilizando datos predefinidos. generate_event: Combina datos generales y específicos para crear un evento completo. send_record_batch: Envía un lote de eventos al flujo de Kinesis. send_events_infinite: Envía eventos en un bucle infinito, generándolos o leyendo desde un archivo.Configuración Principal y Ejecución
boto3. send_events_infinite para iniciar el envío de eventos al flujo.Ejecución Condicional
Utiliza if __name__ == '__main__': para garantizar que el script solo se ejecute directamente y no como un módulo importado.
# Publish Data Generador de datos provisto por AWS .
import boto3 # type: ignore
import json
import random
from random import choice
import time
from datetime import datetime
import uuid
import os
import numpy
import argparse
from access_keys import ACCESS_KEY, SECRET_KEY
# Event Payload defaults
DEFAULT_EVENT_VERSION = '1.0.0'
DEFAULT_BATCH_SIZE = 5
def parse_cmd_line():
"""Parse the command line and extract the necessary values."""
parser = argparse.ArgumentParser(description='Send data to a Kinesis stream for analytics. By default, the script '
'will send events infinitely. If an input file is specified, the '
'script will instead read and transmit all of the events contained '
'in the file and then terminate.')
# REQUIRED arguments
kinesis_regions = boto3.session.Session().get_available_regions('kinesis')
parser.add_argument('--region', required=True, choices=kinesis_regions, type=str,
dest='region_name', metavar='kinesis_aws_region',
help='The AWS region where the Kinesis stream is located.')
parser.add_argument('--stream-name', required=True, type=str, dest='stream_name',
help='The name of the Kinesis stream to publish to. Must exist in the specified region.')
parser.add_argument('--application-id', required=True, type=str, dest='application_id',
help='The application_id to use when submitting events to ths stream (i.e. You can use the default application for testing).')
# OPTIONAL arguments
parser.add_argument('--batch-size', type=int, dest='batch_size', default=DEFAULT_BATCH_SIZE,
help='The number of events to send at once using the Kinesis PutRecords API.')
parser.add_argument('--input-filename', type=str, dest='input_filename',
help='Send events from a file rather than randomly generate them. The format of the file'
' should be one JSON-formatted event per line.')
return parser.parse_args()
# Returns array of UUIDS. Used for generating sets of random event data
def getUUIDs(dataType, count):
uuids = []
for i in range(0, count):
uuids.append(str(uuid.uuid4()))
return uuids
# Randomly choose an event type from preconfigured options
def getEventType():
event_types = {
1: 'user_registration',
2: 'user_knockout',
3: 'item_viewed',
4: 'iap_transaction',
5: 'login',
6: 'logout',
7: 'tutorial_progression',
8: 'user_rank_up',
9: 'matchmaking_start',
10: 'matchmaking_complete',
11: 'matchmaking_failed',
12: 'match_start',
13: 'match_end',
14: 'level_started',
15: 'level_completed',
16: 'level_failed',
17: 'lootbox_opened',
18: 'user_report',
19: 'user_sentiment'
}
return event_types[numpy.random.choice([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], 1, p=[0.04, 0.05, 0.18, 0.02, 0.1, 0.06, 0.04, 0.03, 0.025, 0.025, 0.01, 0.03, 0.03, 0.08, 0.08, 0.08, 0.04, 0.04, 0.04])[0]]
# Generate a randomized event from preconfigured sample data
def getEvent(event_type, SERVERS, MATCHES):
levels = [
'1',
'2',
'3',
'4',
'5'
]
countries = [
'UNITED STATES',
'UK',
'JAPAN',
'SINGAPORE',
'AUSTRALIA',
'BRAZIL',
'SOUTH KOREA',
'GERMANY',
'CANADA',
'FRANCE'
]
items = getUUIDs('items', 10)
currencies = [
'USD',
'EUR',
'YEN',
'RMB'
]
platforms = [
'nintendo_switch',
'ps4',
'xbox_360',
'iOS',
'android',
'pc',
'fb_messenger'
]
tutorial_screens = [
'1_INTRO',
'2_MOVEMENT',
'3_WEAPONS',
'4_FINISH'
]
match_types = [
'1v1',
'TEAM_DM_5v5',
'CTF'
]
matching_failed_msg = [
'timeout',
'user_quit',
'too_few_users'
]
maps = [
'WAREHOUSE',
'CASTLE',
'AIRPORT'
]
game_results = [
'WIN',
'LOSE',
'KICKED',
'DISCONNECTED',
'QUIT'
]
spells = [
'WATER',
'EARTH',
'FIRE',
'AIR'
]
ranks = [
'1_BRONZE',
'2_SILVER',
'3_GOLD',
'4_PLATINUM',
'5_DIAMOND',
'6_MASTER'
]
item_rarities = [
'COMMON',
'UNCOMMON',
'RARE',
'LEGENDARY'
]
report_reasons = [
'GRIEFING',
'CHEATING',
'AFK',
'RACISM/HARASSMENT'
]
switcher = {
'login': {
'event_data': {
'platform': str(numpy.random.choice(platforms, 1, p=[0.2, 0.1, 0.3, 0.15, 0.1, 0.05, 0.1])[0]),
'last_login_time': int(time.time())-random.randint(40000,4000000)
}
},
'logout': {
'event_data': {
'last_screen_seen': 'the last screen'
}
},
'client_latency': {
'event_data': {
'latency': numpy.random.choice((random.randint(40,185),1)),
'connected_server_id': str(random.choice(SERVERS)),
'region': str(random.choice(countries))
}
},
'user_registration': {
'event_data': {
'country_id': str(numpy.random.choice(countries, 1, p=[0.3, 0.1, 0.2, 0.05, 0.05, 0.02, 0.15, 0.05, 0.03, 0.05])[0]),
'platform': str(numpy.random.choice(platforms, 1, p=[0.2, 0.1, 0.3, 0.15, 0.1, 0.05, 0.1])[0])
}
},
'user_knockout': {
'event_data': {
'match_id': str(random.choice(MATCHES)),
'map_id': str(numpy.random.choice(maps, 1, p=[0.6, 0.3, 0.1])[0]),
'spell_id': str(numpy.random.choice(spells, 1, p=[0.1, 0.4, 0.3, 0.2])[0]),
'exp_gained': random.randint(1,2)
}
},
'item_viewed': {
'event_data': {
'item_id': str(numpy.random.choice(items, 1, p=[0.125, 0.11, 0.35, 0.125, 0.04, 0.01, 0.07, 0.1, 0.05, 0.02])[0]),
'item_version': random.randint(1,2)
}
},
'iap_transaction': {
'event_data': {
'item_id': str(numpy.random.choice(items, 1, p=[0.125, 0.11, 0.35, 0.125, 0.04, 0.01, 0.07, 0.1, 0.05, 0.02])[0]),
'item_version': random.randint(1,2),
'item_amount': random.randint(1,4),
'currency_type': str(numpy.random.choice(currencies, 1, p=[0.7, 0.15, 0.1, 0.05])[0]),
'country_id': str(numpy.random.choice(countries, 1, p=[0.3, 0.1, 0.2, 0.05, 0.05, 0.02, 0.15, 0.05, 0.03, 0.05])[0]),
'currency_amount': random.randint(1,10),
'transaction_id': str(uuid.uuid4())
}
},
'tutorial_progression': {
'event_data': {
'tutorial_screen_id': str(numpy.random.choice(tutorial_screens, 1, p=[0.3, 0.3, 0.2, 0.2])[0]),
'tutorial_screen_version': random.randint(1,2)
}
},
'user_rank_up': {
'event_data': {
'user_rank_reached': str(numpy.random.choice(ranks, 1, p=[0.25, 0.35, 0.2, 0.15, 0.0499, 0.0001])[0])
}
},
'matchmaking_start': {
'event_data': {
'match_id': str(random.choice(MATCHES)),
'match_type': str(numpy.random.choice(match_types, 1, p=[0.4, 0.3, 0.3])[0])
}
},
'matchmaking_complete': {
'event_data': {
'match_id': str(random.choice(MATCHES)),
'match_type': str(numpy.random.choice(match_types, 1, p=[0.6, 0.2, 0.2])[0]),
'matched_slots': random.randrange(start=6, stop=10)
}
},
'matchmaking_failed': {
'event_data': {
'match_id': str(random.choice(MATCHES)),
'match_type': str(numpy.random.choice(match_types, 1, p=[0.35, 0.2, 0.45])[0]),
'matched_slots': random.randrange(start=1, stop=10),
'matching_failed_msg': str(numpy.random.choice(matching_failed_msg, 1, p=[0.35, 0.2, 0.45])[0])
}
},
'match_start': {
'event_data': {
'match_id': str(random.choice(MATCHES)),
'map_id': str(numpy.random.choice(maps, 1, p=[0.3, 0.3, 0.4])[0])
}
},
'match_end': {
'event_data': {
'match_id': str(random.choice(MATCHES)),
'map_id': str(numpy.random.choice(maps, 1, p=[0.3, 0.3, 0.4])[0]),
'match_result_type': str(numpy.random.choice(game_results, 1, p=[0.4, 0.4, 0.05, 0.05, 0.1])[0]),
'exp_gained': random.randrange(start=100, stop=200),
'most_used_spell': str(numpy.random.choice(spells, 1, p=[0.1, 0.4, 0.2, 0.3])[0])
}
},
'level_started': {
'event_data': {
'level_id': str(numpy.random.choice(levels, 1, p=[0.2, 0.2, 0.2, 0.2, 0.2])[0]),
'level_version': random.randint(1,2)
}
},
'level_completed': {
'event_data': {
'level_id': str(numpy.random.choice(levels, 1, p=[0.6, 0.2, 0.12, 0.05, 0.03])[0]),
'level_version': random.randint(1,2)
}
},
'level_failed': {
'event_data': {
'level_id': str(numpy.random.choice(levels, 1, p=[0.001, 0.049, 0.05, 0.3, 0.6])[0]),
'level_version': random.randint(1,2)
}
},
'lootbox_opened': {
'event_data': {
'lootbox_id': str(uuid.uuid4()),
'lootbox_cost': random.randint(2,5),
'item_rarity': str(numpy.random.choice(item_rarities, 1, p=[0.5, 0.3, 0.17, .03])[0]),
'item_id': str(numpy.random.choice(items, 1, p=[0.125, 0.11, 0.35, 0.125, 0.04, 0.01, 0.07, 0.1, 0.05, 0.02])[0]),
'item_version': random.randint(1,2),
'item_cost': random.randint(1,5)
}
},
'user_report': {
'event_data': {
'report_id': str(uuid.uuid4()),
'report_reason': str(numpy.random.choice(report_reasons, 1, p=[0.2, 0.5, 0.1, 0.2])[0])
}
},
'user_sentiment': {
'event_data': {
'user_rating': random.randint(1,5)
}
}
}
return switcher[event_type]
# Take an event type, get event data for it and then merge that event-specific data with the default event fields to create a complete event
def generate_event():
SERVERS = getUUIDs('str', 300)
MATCHES = getUUIDs('str', 300)
event_type = getEventType()
# Within the demo script the event_name is set same as event_type for simplicity.
# In many use cases multiple events could exist under a common event type which can enable you to build a richer data taxonomy.
event_name = event_type
event_data = getEvent(event_type, MATCHES=MATCHES, SERVERS=SERVERS)
event = {
'event_version': DEFAULT_EVENT_VERSION,
'event_id': str(uuid.uuid4()),
'event_type': event_type,
'event_name': event_name,
'event_timestamp': int(time.time()),
'app_version': str(numpy.random.choice(['1.0.0', '1.1.0', '1.2.0'], 1, p=[0.05, 0.80, 0.15])[0])
}
event.update(event_data)
return event
def send_record_batch(kinesis_client, stream_name, raw_records):
"""Send a batch of records to Amazon Kinesis."""
# Translate input records into the format needed by the boto3 SDK
formatted_records = []
for rec in raw_records:
formatted_records.append({'PartitionKey': rec['event']['event_name'], 'Data': json.dumps(rec)})
kinesis_client.put_records(StreamName=stream_name, Records=formatted_records)
print('Sent %d records to stream %s.' % (len(formatted_records), stream_name))
def send_events_infinite(kinesis_client, stream_name, batch_size, application_id):
"""Send a batches of randomly generated events to Amazon Kinesis."""
while True:
records = []
# Create a batch of random events to send
for i in range(0, batch_size):
event_dict = generate_event()
record = {
'event': event_dict,
'application_id': application_id
}
records.append(record)
send_record_batch(kinesis_client, stream_name, records)
time.sleep(random.randint(1,7))
if __name__ == '__main__':
args = parse_cmd_line()
aws_region = args.region_name
kinesis_stream = args.stream_name
batch_size = args.batch_size or DEFAULT_BATCH_SIZE
application_id = args.application_id
print('===========================================')
print('CONFIGURATION PARAMETERS:')
print('- KINESIS_STREAM: ' + kinesis_stream)
print('- AWS_REGION: ' + aws_region)
print('- APPLICATION_ID: ' + application_id)
SERVERS = getUUIDs('servers', 3)
MATCHES = getUUIDs('matches', 50)
print('===========================================\n')
session = boto3.Session()
client = session.client('kinesis', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY ,region_name=aws_region)
send_events_infinite(client, kinesis_stream, batch_size, application_id)
Este script realiza el procesamiento de datos desde la capa Raw hacia la capa Silver en un entorno de AWS Glue, transformando y estructurando la información para facilitar análisis posteriores. A continuación, se describe su funcionamiento en detalle.
El propósito de este script es filtrar, transformar y almacenar datos en la capa Silver. Se enfoca en cuatro tipos principales de eventos:
lootbox_opened level_started, level_completed, level_failed iap_transaction match_start, match_end Los datos resultantes se guardan en formato Parquet en Amazon S3, organizados por particiones de tiempo (año, mes, día).
El script utiliza bibliotecas clave para el procesamiento:
from_unixtime para convertir marcas de tiempo de Unix a formatos legibles.Job para rastrear y gestionar el trabajo en Glue.actividad_3 y tabla raw_events_actividad_3).DynamicFrame, que luego se convierte en un DataFrame para realizar transformaciones avanzadas.Se procesan los datos según el tipo de evento:
lootbox_opened)¶lootbox_opened.event_datetime) y elimina duplicados.level_started, level_completed, level_failed)¶iap_transaction)¶match_start, match_end)¶job.commit().El script genera los siguientes conjuntos de datos procesados en la capa Silver:
| Nombre de Salida | Tipo de Evento | Ubicación en S3 |
|---|---|---|
lootbox_open |
Eventos de apertura de LootBoxes | s3://capa-silver-gameanalytics/lootbox_open/ |
levels |
Eventos de niveles (iniciados, terminados) | s3://capa-silver-gameanalytics/levels/ |
match |
Eventos de inicio y fin de partidas | s3://capa-silver-gameanalytics/match/ |
iaps |
Transacciones dentro de la aplicación | s3://capa-silver-gameanalytics/iaps/ |
year, month y day para mejorar el rendimiento de las consultas.El script se organiza en las siguientes secciones principales:
Importación de Módulos
Importa las bibliotecas necesarias para el procesamiento de datos en AWS Glue y Apache Spark, como awsglue, pyspark.sql.functions y otras.
Inicialización del Contexto de Glue
Configura el contexto de Glue y Spark, incluyendo la creación de un objeto Job para la gestión del trabajo.
Lectura de Datos desde la Capa Raw
create_dynamic_frame.from_catalog. DynamicFrame en un DataFrame de Spark para mayor flexibilidad en el procesamiento.Procesamiento por Tipo de Evento
lootbox_opened, level_started, iap_transaction, match_start, match_end). event_datetime.Guardado de los Datos Procesados en la Capa Silver
data_to_save que organiza los DataFrames procesados por tipo de evento. DynamicFrame para guardarlo en S3. Commit del Trabajo
Finaliza el trabajo con job.commit() para asegurar que se registre correctamente en AWS Glue.
## Procesamiento de la capa Silver
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
dyf = glueContext.create_dynamic_frame.from_catalog(database='actividad_3', table_name='raw_events_actividad_3')
dyf.printSchema()
df = dyf.toDF()
# Ejemplo de LootBox ID
loot_box_df = (
df.where("event.event_type == 'lootbox_opened'")
.select(
"event.event_type",
# Convert Unix epoch timestamp to readable datetime
from_unixtime((col("event.event_timestamp")).cast("long")).alias("event_datetime"),
"event.event_data.lootbox_id",
"event.event_data.lootbox_cost",
"event.event_data.item_cost",
col("partition_0").alias("year"),
col("partition_1").alias("month"),
col("partition_2").alias("day")
)
.dropDuplicates()
.orderBy(col("event_datetime"))
)
level_df = (
df.where("event.event_type in ('level_started', 'level_completed', 'level_failed')")
.select(
"event.event_type",
# Convert Unix epoch timestamp to readable datetime
from_unixtime((col("event.event_timestamp")).cast("long")).alias("event_datetime"),
"event.event_data.level_id",
"event.event_data.level_version",
col("partition_0").alias("year"),
col("partition_1").alias("month"),
col("partition_2").alias("day")
)
.dropDuplicates()
.orderBy(col("event_datetime"))
)
iap_df = (
df.where("event.event_type in ('iap_transaction')")
.select(
"event.event_type",
# Convert Unix epoch timestamp to readable datetime
from_unixtime((col("event.event_timestamp")).cast("long")).alias("event_datetime"),
"event.event_data.item_id",
"event.event_data.item_version",
"event.event_data.item_amount",
"event.event_data.currency_type",
"event.event_data.country_id",
"event.event_data.currency_amount",
"event.event_data.transaction_id",
col("partition_0").alias("year"),
col("partition_1").alias("month"),
col("partition_2").alias("day")
)
.dropDuplicates()
.orderBy(col("event_datetime"))
)
match_df = (
df.where("event.event_type in ('match_start', 'match_end')")
.select(
"event.event_type",
# Convert Unix epoch timestamp to readable datetime
from_unixtime((col("event.event_timestamp")).cast("long")).alias("event_datetime"),
"event.event_data.match_id",
"event.event_data.map_id",
"event.event_data.match_result_type",
"event.event_data.exp_gained",
"event.event_data.most_used_spell",
col("partition_0").alias("year"),
col("partition_1").alias("month"),
col("partition_2").alias("day")
)
.dropDuplicates()
.orderBy(col("event_datetime"))
)
# Save Data
data_to_save = {
'lootbox_open': loot_box_df,
'levels': level_df,
'match': match_df,
'iaps': iap_df
}
for name, df in data_to_save.items():
dyf = DynamicFrame.fromDF(df, glueContext, name)
s3output = glueContext.getSink(
path=f"s3://capa-silver-gameanalytics/{name}/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["year", "month", "day"],
compression="snappy",
enableUpdateCatalog=True,
transformation_ctx="s3output",
)
# Update Glue Catalog Table Information
s3output.setCatalogInfo(
catalogDatabase="silver",
catalogTableName=name
)
# Set Output Format
s3output.setFormat("glueparquet")
# Write DynamicFrame to S3
s3output.writeFrame(dyf)
job.commit()